/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/***********************************************************************
 *   $Id: arithmeticcolumn.cpp 9679 2013-07-11 22:32:03Z zzhu $
 *
 *
 ***********************************************************************/
#include <string>
#include <exception>
#include <stdexcept>
#include <sstream>
using namespace std;

#include "arithmeticcolumn.h"
#include "constantcolumn.h"
#include "simplecolumn.h"
#include "operator.h"
#include "bytestream.h"
using namespace messageqcpp;

#include "objectreader.h"
#include "expressionparser.h"
#include "calpontselectexecutionplan.h"
#include "treenodeimpl.h"
#include "functioncolumn.h"
#include "aggregatecolumn.h"
#include "windowfunctioncolumn.h"

namespace
{
/** print the tree
 *
 * this function is mostly for debug purpose
 */
void walkfn(const execplan::ParseTree* n, ostream& output)
{
  output << *(n->data()) << endl;
}

}  // namespace

namespace execplan
{
/**
 *  Constructors/Destructors
 */
ArithmeticColumn::ArithmeticColumn() : ReturnedColumn(), fExpression(0)
{
}

ArithmeticColumn::ArithmeticColumn(const string& sql, const uint32_t sessionID)
 : ReturnedColumn(sessionID), fData(sql), fExpression(0)
{
  buildTree();
}

ArithmeticColumn::ArithmeticColumn(const ArithmeticColumn& rhs, const uint32_t sessionID)
 : ReturnedColumn(rhs, sessionID)
 , fTableAlias(rhs.fTableAlias)
 , fAsc(rhs.fAsc)
 , fData(rhs.fData)
 , fExpression(new ParseTree(*(rhs.expression())))
{
  fAlias = rhs.fAlias;
  fSimpleColumnList.clear();
  fExpression->walk(getSimpleCols, &fSimpleColumnList);
  fAggColumnList.clear();
  fExpression->walk(getAggCols, &fAggColumnList);
  fWindowFunctionColumnList.clear();
  fExpression->walk(getWindowFunctionCols, &fWindowFunctionColumnList);
}

ArithmeticColumn::~ArithmeticColumn()
{
  if (fExpression != NULL)
    delete fExpression;

  fExpression = NULL;
}

/**
 * Methods
 */

void ArithmeticColumn::expression(ParseTree*& expression)
{
  if (fExpression != NULL)
    delete fExpression;

  fExpression = expression;
  expression = 0;
}

void ArithmeticColumn::buildTree()
{
  CalpontSelectExecutionPlan::Parser parser;
  vector<Token> tokens;
  Token t;

  string::size_type i = 0;

  // string fData = ReturnedColumn::data();

  try
  {
    while (fData[i])
    {
      if (isdigit(fData[i]) || fData[i] == '.')
      {
        string num;

        while (isdigit(fData[i]) || fData[i] == '.')
        {
          num.push_back(fData[i++]);
        }

        ConstantColumn* cc = new ConstantColumn(num, ConstantColumn::NUM);
        t.value = cc;

        tokens.push_back(t);
        continue;
      }
      else if (fData[i] == '+' || fData[i] == '-' || fData[i] == '*' || fData[i] == '/' || fData[i] == '^' ||
               fData[i] == '(' || fData[i] == ')')
      {
        // t.is_operator now indicate the previous token type
        // if prev token is operand, then this '(' is func_open
        // otherwise, this '(' is open
        if (fData[i] == '(' && fData[i + 1] != '-' && !t.is_operator())
        {
          // open '('
          Operator* op1 = new Operator("(");
          t.value = op1;
          tokens.push_back(t);

          // This is not complete... we shouldn't be creating TreeNodes
          string param = nextToken(++i, ')');

          TreeNode* tn = new TreeNodeImpl(param);
          t.value = tn;

          tokens.push_back(t);

          // close ')'
          Operator* op2 = new Operator(")");
          t.value = op2;
          tokens.push_back(t);
          continue;
        }

        string op;

        // Bug 319 fix. recover '^' to '||'
        if (fData[i] == '^')
          op = "||";
        else
          op.push_back(fData[i]);

        Operator* oper = new Operator(op);
        t.value = oper;

        tokens.push_back(t);
        ++i;

        // t.is_operator now indicate the previous token type
        // if prev token is operand, then this '(' is func_open
        // otherwise, this '(' is open
        // @bug 241 fix. check (-n_nationkey) case
        if (fData[i] == '(' && fData[i] != '-' && !t.is_operator())
        {
          // This is not complete... we shouldn't be creating TreeNodes
          string param = nextToken(++i, ')');
          TreeNode* sc = new TreeNodeImpl(param);
          t.value = sc;

          tokens.push_back(t);

          // close ')'
          Operator* oper = new Operator(")");
          t.value = oper;
          tokens.push_back(t);
        }

        continue;
      }

      else if (isalpha(fData[i]) || fData[i] == '_')
      {
        string identifier;

        while (isalnum(fData[i]) || fData[i] == '_' || fData[i] == '.')
        {
          identifier.push_back(fData[i++]);
        }

        SimpleColumn* sc = new SimpleColumn(identifier, fSessionID);
        t.value = sc;

        tokens.push_back(t);
        continue;
      }

      else if (fData[i] == '\'')
      {
        string literal = nextToken(++i, '\'');
        ConstantColumn* cc = new ConstantColumn(literal, ConstantColumn::LITERAL);
        t.value = cc;

        tokens.push_back(t);
        continue;
      }

      ++i;
    }

    fExpression = parser.parse(tokens.begin(), tokens.end());
  }
  catch (const invalid_argument& e)
  {
    // clean up tokens
    for (unsigned int i = 0; i < tokens.size(); i++)
    {
      delete tokens[i].value;
      tokens[i].value = 0;
    }

    throw runtime_error(e.what());
  }
}

const string ArithmeticColumn::nextToken(string::size_type& pos, char end) const
{
  string token;
  // string fData = ReturnedColumn::data();

  // increment num when get '(' and decrement when get ')'
  // to find the mathing ')' when num = 0
  int num = 1;

  for (; pos < fData.length();)
  {
    if (end == ')')
    {
      if (fData[pos] == '(')
        num++;
      else if (fData[pos] == ')')
        num--;

      if (num == 0)
      {
        pos++;
        return token;
      }
    }
    else
    {
      if (fData[pos] == end)
      {
        pos++;
        return token;
      }
    }

    token.push_back(fData[pos++]);
  }

  string msg = "No ";
  msg.append(1, end);
  msg.append(" found in " + fData);
  throw invalid_argument(msg);
  return {};
}

ostream& operator<<(ostream& output, const ArithmeticColumn& rhs)
{
  output << rhs.toString();
  return output;
}

const string ArithmeticColumn::toString() const
{
  ostringstream oss;
  oss << "ArithmeticColumn: ";

  if (fAlias.length() > 0)
    oss << "Alias: " << fAlias << endl;

  oss << "expressionId=" << fExpressionId << endl;
  oss << "joinInfo=" << fJoinInfo << " returnAll=" << fReturnAll << " sequence#=" << fSequence << endl;
  oss << "resultType=" << colDataTypeToString(fResultType.colDataType) << "|" << fResultType.colWidth << endl;
  if (fExpression != 0)
    fExpression->walk(walkfn, oss);

  return oss.str();
}

string ArithmeticColumn::toCppCode(IncludeSet& includes) const
{
  includes.insert("arithmeticcolumn.h");
  stringstream ss;
  ss << "ArithmeticColumn(" << std::quoted(fData) << ", " << sessionID() << ")";

  return ss.str();
}

void ArithmeticColumn::serialize(messageqcpp::ByteStream& b) const
{
  b << static_cast<ObjectReader::id_t>(ObjectReader::ARITHMETICCOLUMN);
  ReturnedColumn::serialize(b);
  ObjectReader::writeParseTree(fExpression, b);
  b << fTableAlias;
  b << fData;
  b << (uint8_t)fAsc;
}

void ArithmeticColumn::unserialize(messageqcpp::ByteStream& b)
{
  ObjectReader::checkType(b, ObjectReader::ARITHMETICCOLUMN);
  ReturnedColumn::unserialize(b);

  if (fExpression != NULL)
    delete fExpression;

  fExpression = ObjectReader::createParseTree(b);
  b >> fTableAlias;
  b >> fData;
  b >> (uint8_t&)fAsc;

  fSimpleColumnList.clear();
  fExpression->walk(getSimpleCols, &fSimpleColumnList);
  fAggColumnList.clear();
  fExpression->walk(getAggCols, &fAggColumnList);
  fWindowFunctionColumnList.clear();
  fExpression->walk(getWindowFunctionCols, &fWindowFunctionColumnList);
}

bool ArithmeticColumn::operator==(const ArithmeticColumn& t) const
{
  const ReturnedColumn *rc1, *rc2;

  rc1 = static_cast<const ReturnedColumn*>(this);
  rc2 = static_cast<const ReturnedColumn*>(&t);

  if (*rc1 != *rc2)
    return false;

  if (fExpression != NULL && t.fExpression != NULL)
  {
    if (*fExpression != *t.fExpression)
      return false;
  }
  else if (fExpression != NULL || t.fExpression != NULL)
    return false;

  if (fData != t.fData)
    return false;

  return true;
}

bool ArithmeticColumn::operator==(const TreeNode* t) const
{
  const ArithmeticColumn* o;

  o = dynamic_cast<const ArithmeticColumn*>(t);

  if (o == NULL)
    return false;

  return *this == *o;
}

bool ArithmeticColumn::operator!=(const ArithmeticColumn& t) const
{
  return (!(*this == t));
}

bool ArithmeticColumn::operator!=(const TreeNode* t) const
{
  return (!(*this == t));
}

bool ArithmeticColumn::hasAggregate()
{
  if (fHasAggregate)
    return true;

  fAggColumnList.clear();
  fExpression->walk(getAggCols, &fAggColumnList);

  if (!fAggColumnList.empty())
    fHasAggregate = true;

  return fHasAggregate;
}

bool ArithmeticColumn::hasWindowFunc()
{
  fWindowFunctionColumnList.clear();
  fExpression->walk(getWindowFunctionCols, &fWindowFunctionColumnList);

  if (fWindowFunctionColumnList.empty())
    return false;

  return true;
}

void ArithmeticColumn::setDerivedTable()
{
  if (hasAggregate())
  {
    fDerivedTable = "";
    return;
  }

  if (fExpression)
  {
    fExpression->setDerivedTable();
    fDerivedTable = fExpression->derivedTable();
  }
}

void ArithmeticColumn::replaceRealCol(std::vector<SRCP>& derivedColList)
{
  if (fExpression)
    replaceRefCol(fExpression, derivedColList);
}

void ArithmeticColumn::setSimpleColumnList()
{
  fSimpleColumnList.clear();
  fExpression->walk(getSimpleCols, &fSimpleColumnList);
}

bool ArithmeticColumn::singleTable(CalpontSystemCatalog::TableAliasName& tan)
{
  tan.clear();
  setSimpleColumnList();

  for (uint32_t i = 0; i < fSimpleColumnList.size(); i++)
  {
    CalpontSystemCatalog::TableAliasName stan(
        fSimpleColumnList[i]->schemaName(), fSimpleColumnList[i]->tableName(),
        fSimpleColumnList[i]->tableAlias(), fSimpleColumnList[i]->viewName());

    if (tan.table.empty())
      tan = stan;
    else if (stan != tan)
      return false;
  }

  return true;
}

}  // namespace execplan
